Optimize subclasses of DummyOperator for Scheduling#12745
Optimize subclasses of DummyOperator for Scheduling#12745kaxil merged 5 commits intoapache:masterfrom
Conversation
Custom operators inheriting from DummyOperator will now instead of going to a scheduled state will go set straight to success if they don't have callbacks set. closes apache#11393
| @classmethod | ||
| def _is_inherited_from_dummy_operator(cls, op: BaseOperator) -> bool: | ||
| """Used to determine if an Operator is inherited from DummyOperator""" | ||
| if op.task_type == "DummyOperator" or isinstance(op, DummyOperator): |
There was a problem hiding this comment.
I did not add check to see if the execute method is empty or not like we discussed @ashb. My thinking for that was as it is a DummyOperator where we list it should do nothing, let's just keep it simple.
WDYT?
There was a problem hiding this comment.
Should we just simply do isinstance(op, DummyOperator), why check the task_type? Anyway, we can just do:
return op.task_type == "DummyOperator" or isinstance(op, DummyOperator)
There was a problem hiding this comment.
One for real dag, one for Serialzed dag I think
There was a problem hiding this comment.
How about instead of this approach,
Add a method/propery on to BaseOperator:
class BaseOperator:
@property
def inherits_from_dummy_operator(self):
getattr(self, '_is_dummy', False)
And then
class DummyOperator(BaseOperator):
inherits_from_dummy_operator = True
That way this method isn't needed, and at runtime in both cases (real or serialized) we can just look at `op.inherits_from_dummy_operator
There was a problem hiding this comment.
Using the approach I suggested (if it works), you can just get rid of this method :)
There was a problem hiding this comment.
Works like a charm, thanks.
Updated in 0b47a1b
|
|
||
| dag = DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once') | ||
|
|
||
|
|
There was a problem hiding this comment.
This DAG is used by test_should_mark_dummy_task_as_success test in tests/jobs/test_scheduler_job.py
airflow/models/dagrun.py
Outdated
| for ti in schedulable_tis | ||
| if ( | ||
| ti.task.task_type == "DummyOperator" | ||
| (ti.task.task_type == "DummyOperator" or getattr(ti.task, "_is_dummy", False)) |
There was a problem hiding this comment.
| (ti.task.task_type == "DummyOperator" or getattr(ti.task, "_is_dummy", False)) | |
| (getattr(ti.task, "_is_dummy", False) |
Should this be enough? I think DummyOperator will also fall in this category
There was a problem hiding this comment.
This is just for backwards-compatibility so the current Serialized DAGs which don't have _is_dummy field still continue to work
There was a problem hiding this comment.
I think right now this could be used in Scheduler (on serialized dags) and Backfill (on real DAGs) so that commit alone won't help.
| setattr(op, field, None) | ||
|
|
||
| # Used to determine if an Operator is inherited from DummyOperator | ||
| setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy"))) |
There was a problem hiding this comment.
| setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy"))) | |
| setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy", False))) |
To load existing s10n blobs.
ashb
left a comment
There was a problem hiding this comment.
Add a comment, but pre-emptively approve.
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
Custom operators inheriting from DummyOperator will now instead
of going to a scheduled state will go set straight to success
if they don't have callbacks set.
closes #11393
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.